package org.hepeng.workx.mybatis.event.listener;

import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.hepeng.workx.constant.ExecuteMode;
import org.hepeng.workx.mybatis.event.consumer.EventConsumer;
import org.hepeng.workx.mybatis.event.consumer.EventConsumerRegistry;
import org.hepeng.workx.mybatis.event.ExecuteEvent;
import org.hepeng.workx.util.concurrent.NamedThreadFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author he peng
 */
public abstract class AbstractExecuteEventListener implements ExecuteEventListener {

    private final SetMultimap<Class<? extends Object> , EventConsumer> EVENT_CONSUMER_MAP = Multimaps.synchronizedSetMultimap(Multimaps.newSetMultimap(new ConcurrentHashMap<>(), () -> new HashSet<>()));
    private final EventConsumerRegistry EVENT_CONSUMER_REGISTRY = new EventConsumerRegistry();

    private static final Executor EXECUTOR;
    protected String packageNames;

    static {
        ThreadFactory threadFactory = new NamedThreadFactory("WorkX-Mybatis-Execute-Event-Consumer" , true);
        EXECUTOR = new ThreadPoolExecutor(2 , 2 , 0L , TimeUnit.MILLISECONDS , new LinkedBlockingQueue<>() , threadFactory);
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                if (! ((ThreadPoolExecutor) EXECUTOR).isShutdown()) {
                    ((ThreadPoolExecutor) EXECUTOR).shutdownNow();
                }
            }
        } , "WorkX-Mybatis-Execute-Event-Consumer-ShutdownHook"));
    }

    public AbstractExecuteEventListener() {}

    public AbstractExecuteEventListener(boolean isOnlySpecifiedPackage , String packageNames) {
        this.packageNames = packageNames;
        addEventConsumer(isOnlySpecifiedPackage , packageNames);
    }

    @Override
    public void onExecuteEvent(ExecuteEvent event) {
        Observable<ExecuteEvent> observable = Observable.just(event);
        getConsumers(event.getClass() , true).forEach(consumer -> {
            ExecuteMode executeMode = consumer.getExecuteMode();
            if (Objects.equals(ExecuteMode.ASYNC , executeMode)) {
                observable.subscribeOn(Schedulers.from(EXECUTOR));
            }
            observable.subscribe(
                    data -> consumer.onConsume(data) ,
                    error -> consumer.onError(error),
                    () -> consumer.onComplete());
        });
    }

    @Override
    public void addEventConsumer(boolean isOnlySpecifiedPackage , String... packages) {
        EVENT_CONSUMER_REGISTRY.addConsumers(isOnlySpecifiedPackage , packages);
        for (Map.Entry<Class<?>, EventConsumer> consumerEntry : EVENT_CONSUMER_REGISTRY.entriesConsumer()) {
            addEventConsumer(consumerEntry.getKey(), consumerEntry.getValue());
        }
    }

    @Override
    public void addEventConsumer(Class<?> key , EventConsumer consumer) {
        EVENT_CONSUMER_MAP.put(key , consumer);
    }

    @Override
    public void addEventConsumer(Class<?> key , Set<EventConsumer> consumers) {
        EVENT_CONSUMER_MAP.putAll(key , consumers);
    }

    @Override
    public Set<EventConsumer> getConsumers(Class<?> key) {
        return getConsumers(key , false);
    }

    @Override
    public Set<EventConsumer> getConsumers(Class<?> key, boolean includeObjectClass) {
        Set<EventConsumer> consumers = Sets.newHashSet(EVENT_CONSUMER_MAP.get(key));
        if (includeObjectClass) {
            consumers.addAll(EVENT_CONSUMER_MAP.get(Object.class));
        }
        return consumers;
    }
}
